Skip to content

Conversation

@0oshowero0
Copy link
Collaborator

@0oshowero0 0oshowero0 commented Feb 6, 2026

Summary

This PR introduces a High-Level Key-Value (KV) Interface to TransferQueue, offering a Redis-style API that can enjoy most of the advanced features provided by TransferQueue.

Background

In previous versions of TransferQueue, the learning curve was relatively sharp for new users. To perform basic operations, users had to:

  1. Understand BatchMeta SampleMeta and FieldMeta design (as illustrated in tutorial/02_metadat_concepts.py
  2. Navigate the flexible but complex TransferQueueClient API.

Although PR #26 simplified the initialization process, the core interaction still required exposing low-level details. This PR bridges that gap by providing a familiar, easy-to-use KV abstraction.

TransferQueue API Architecture

With this PR, TransferQueue now supports a two-level API architecture to satisfy different user needs.

Level Tier Style Fine-Grained Access Streaming Sampler Multiple-Backends
High KV Interface (this PR) Put/Get/List/Clear
High StreamingDataLoader (#23) PyTorch DataLoader
Low TransferQueueClient Metadata-based

High-Level API

Key-Value based API (This PR)

Methods

  • (async_)kv_put: Insert/Update a multi-column sample by key, with optional metadata tag
  • (async_)kv_batch_put: Put multiple key-value pairs efficiently in batch
  • (async_)kv_batch_get: Retrieve samples (by keys), supporting column selection (by fields)
  • (async_)kv_list: List keys and tags (metadata) in a partition
  • (async_)kv_clear: Remove key-value pairs from storage

Key Features

  • Redis-style Semantics: Familiar KV interface (Put/Get/List) for zero learning curve
  • Fine-grained Access: Update or retrieve specific fields (columns) within a key (row) without full op.
  • Partition Isolation: Logical separation of storage namespaces
  • Metadata Tags: Lightweight metadata for status tracking
  • Pluggable Backends: Supports multiple backends

StreamingDataLoader API

Refer to our RoadMap and related PRs(#23).

The usage example can be found in tutorial/06_streaming_dataloader.py.

Low-Level API

Directly manipulate the TransferQueueClient. Refer to tutorial/03_metadata_concepts.py, tutorial/04_understanding_controller.py and tutorial/05_custom_sampler.py for details.

Usage Example

Please refer to tutorial/02_kv_interface.py and tests/e2e/test_kv_interface_e2e.py for details.

import torch
from tensordict import TensorDict
import transfer_queue as tq

# initialize TQ
tq.init()

# prepare data
batch_input_ids = torch.tensor(
    [
        [4, 5, 6],
        [7, 8, 9],
        [10, 11, 12],
        [13, 14, 15],
    ]
)
batch_attention_mask = torch.ones_like(batch_input_ids)

data_batch = TensorDict(
    {
        "input_ids": batch_input_ids,
        "attention_mask": batch_attention_mask,
    },
    batch_size=batch_input_ids.size(0),
)

keys = ["1_0", "1_1", "1_2", "2_0"]  # 4 keys for 4 samples
tags = [{"global_steps": 1, "status": "running", "model_version": 1} for _ in range(len(keys))]
partition_id = "test"
# use kv interface to put into TQ
tq.kv_batch_put(keys=keys, partition_id=partition_id, fields=data_batch, tags=tags)

# list all keys and tags
all_keys, all_tags = tq.kv_list(partition_id=partition_id)
for k, t in zip(all_keys, all_tags, strict=False):
    print(f"    - key='{k}' | tag={t}")

# retrieve all data
retrieved_all = tq.kv_batch_get(keys=all_keys, partition_id=partition_id)
print(f"  Fields: {list(retrieved_all.keys())}")

Use Cases & Limitations

Best For:

  • Scenarios requiring fine-grained data access (e.g., updating a reward score for a specific prompt).
  • Integration with external ReplayBuffers or Single-Controller architectures that manage sample dispatching logic.

Limitations (vs. Streaming/Low-level APIs):

  • No built-in production/consumption tracking: Users must manually check status via tags or manage logic externally.
  • No Built-in Sampler: Must implement data dispatch by ReplayBuffer or single-controller externally.
  • Not Fully Streaming: Consumers typically wait for a controller to dispatch keys before fetching, rather than a continuous stream.

Copilot AI review requested due to automatic review settings February 6, 2026 15:39
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request adds a Key-Value (KV) adapter API to TransferQueue, enabling users to interact with data using string keys instead of BatchMeta objects and global indexes. The feature provides both synchronous and asynchronous APIs for putting, getting, listing, and clearing key-value pairs.

Changes:

  • Adds KV interface API with kv_put, kv_batch_put, kv_get, kv_list, kv_clear and their async variants
  • Modifies BatchMeta.update_custom_meta() API from dict-based (indexed by global_index) to list-based (indexed by position)
  • Adds keys_mapping and revert_keys_mapping to DataPartitionStatus for key-to-index translation
  • Adds new ZMQ request types (KV_RETRIEVE_KEYS, KV_LIST) for KV operations
  • Includes comprehensive test coverage for the new KV interface
  • Adds tutorial files demonstrating custom samplers, controller features, and streaming data loading

Reviewed changes

Copilot reviewed 15 out of 18 changed files in this pull request and generated 20 comments.

Show a summary per file
File Description
transfer_queue/interface.py Adds KV API functions (kv_put, kv_get, kv_list, kv_clear) with sync/async variants and helper utility dict_to_tensordict
transfer_queue/controller.py Adds keys_mapping/revert_keys_mapping fields, kv_retrieve_keys method, KV_LIST request handling, and key cleanup logic
transfer_queue/client.py Implements async_kv_retrieve_keys and async_kv_list methods with proper validation and error handling
transfer_queue/metadata.py Changes custom_meta API from dict[int, dict] to list[dict], affecting update_custom_meta and get_all_custom_meta
transfer_queue/storage/managers/base.py Renames custom_meta to custom_backend_meta for clarity
transfer_queue/utils/zmq_utils.py Adds KV_RETRIEVE_KEYS and KV_LIST ZMQ request types
transfer_queue/utils/common.py Adds dict_to_tensordict utility function for converting dicts to TensorDict
transfer_queue/init.py Exports new KV API functions
tutorial/03_metadata_concepts.py Updates to use new list-based custom_meta API
tutorial/04_understanding_controller.py New tutorial demonstrating controller features
tutorial/05_custom_sampler.py New tutorial showing custom sampler development
tutorial/06_streaming_dataloader.py New tutorial for streaming data loading
tests/test_kv_interface.py Comprehensive unit tests for all KV interface functions
tests/test_controller.py Tests for controller KV interface functionality
tests/test_controller_data_partitions.py Tests for DataPartitionStatus KV methods
tests/test_client.py Tests for client KV methods and mock KV responses
tests/test_metadata.py Updates tests for new custom_meta API
tests/test_kv_storage_manager.py Updates tests renaming custom_meta to custom_backend_meta
Comments suppressed due to low confidence (1)

tutorial/03_metadata_concepts.py:213

  • The update_custom_meta call on lines 208-213 only provides 2 items in the list for a batch that contains 5 samples (created on line 193). According to the new API (line 333 in metadata.py), this will raise a ValueError because the length of custom_meta (2) doesn't match the batch size (5). Either provide custom_meta for all 5 samples or adjust the example to create a batch with only 2 samples.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@0oshowero0 0oshowero0 changed the title [feat] Provide KV adapter API [feat] Introduce high-level key-value (KV) interface Feb 8, 2026
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 20 out of 20 changed files in this pull request and generated 12 comments.

Comments suppressed due to low confidence (1)

README.md:168

  • This link still points to tutorial/05_streaming_dataloader.py, but the StreamingDataLoader tutorial file is now tutorial/06_streaming_dataloader.py (and 05 is custom_sampler). The current URL likely 404s; please update it to the new tutorial path/number.
We have experimentally implemented a **standardized, fully-streamed distributed** workflow via TransferQueue. 

By leveraging the `RankAwareSampler` and `StreamingDataLoader` interfaces, we achieve a **streamlined micro-batch-level producer-consumer pipeline**. This design eliminates the need to manually determine data dispatching logic across varying parallelism strategies—a typical complexity in the single-controller paradigm—thereby greatly simplifying framework design. 

Please refer to our [Roadmap](https://github.com/Ascend/TransferQueue/issues/1) and [tutorials/05_streaming_dataloader.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/05_streaming_dataloader.py) for more details.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 20 out of 20 changed files in this pull request and generated 10 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 592 to 599
# TODO(tianyi): the order of custom meta is coupled with keys/values
for (field_name, global_idx), meta_value in zip(
itertools.product(sorted(metadata.field_names), metadata.global_indexes),
custom_meta,
custom_backend_meta,
strict=True,
):
per_field_custom_meta[global_idx][field_name] = meta_value
metadata.update_custom_meta(per_field_custom_meta)
per_field_custom_backend_meta[global_idx][field_name] = meta_value
metadata._custom_backend_meta.update(per_field_custom_backend_meta)
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In KVStorageManager.put_data, keys are generated from data.keys() but the per-field custom_backend_meta mapping is zipped against sorted(metadata.field_names). If metadata.field_names is empty or doesn’t match the fields being written (e.g., when inserting brand-new KV keys or adding new columns), this can either make the write a no-op (due to the earlier if not metadata.field_names: return) or raise due to the strict zip/length mismatch. Consider deriving the field iteration order from data.keys() (or updating metadata with data fields before this point) so KV writes work reliably.

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Evelynn-V Please notice this potential issue

fields = TensorDict(batch, batch_size=[1])
elif not isinstance(fields, TensorDict):
raise ValueError("field can only be dict or TensorDict")

Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kv_put retrieves a BatchMeta that can have an empty field_names set (especially for brand-new keys). For the KV storage backend (KVStorageManager), put_data() is a no-op when metadata.field_names is empty and may also mis-handle custom_backend_meta ordering if metadata.field_names doesn’t include the fields being written. Before calling tq_client.put(...), ensure batch_meta is populated with the fields being written (e.g., add the fields to metadata) so inserts/partial updates work across backends.

Suggested change
# Ensure BatchMeta.field_names includes all fields being written so that
# KV backends handle inserts/updates correctly even for brand-new keys.
if hasattr(batch_meta, "field_names"):
if batch_meta.field_names is None:
batch_meta.field_names = set()
try:
batch_meta.field_names.update(list(fields.keys()))
except AttributeError:
# In case field_names is not a set-like container, fall back to assignment.
batch_meta.field_names = set(fields.keys())

Copilot uses AI. Check for mistakes.
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
_maybe_create_transferqueue_client(final_conf)


# ==================== Basic API ====================
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does method init() belong to 'Basic API'?

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 20 out of 20 changed files in this pull request and generated 6 comments.

Comments suppressed due to low confidence (1)

README.md:168

  • In the Disaggregated Example section, the tutorial reference still points to tutorial/05_streaming_dataloader.py, but this PR renumbers the streaming dataloader tutorial to 06 (and updates other references accordingly). Update this link/text to avoid sending users to the wrong tutorial.
We have experimentally implemented a **standardized, fully-streamed distributed** workflow via TransferQueue. 

By leveraging the `RankAwareSampler` and `StreamingDataLoader` interfaces, we achieve a **streamlined micro-batch-level producer-consumer pipeline**. This design eliminates the need to manually determine data dispatching logic across varying parallelism strategies—a typical complexity in the single-controller paradigm—thereby greatly simplifying framework design. 

Please refer to our [Roadmap](https://github.com/Ascend/TransferQueue/issues/1) and [tutorials/05_streaming_dataloader.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/05_streaming_dataloader.py) for more details.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants